This document provides a comprehensive analysis of how Apache Spark's Kafka connector decides offset ranges for each partition and determines the total number of rows to process. The analysis is based on the source code from the Kafka 0.10+ SQL connector.
graph TB
subgraph "Spark Driver"
A[KafkaSource/KafkaMicroBatchStream
📊 Query entry point
🔄 Offset management
⏱️ Batch coordination
📈 Progress tracking]
B[KafkaOffsetReader
📡 Fetch latest/earliest offsets
🕐 Timestamp-based lookup
🔍 Partition discovery
⚡ Admin/Consumer API calls]
C[KafkaOffsetRangeCalculator
✂️ Range splitting logic
📊 Partition count calculation
⚖️ Load balancing
📍 Preferred location assignment]
end
subgraph "Spark Executors"
D[KafkaSourceRDD
🏗️ RDD partition creation
📍 Preferred location assignment
⚙️ Compute method implementation
🔄 Iterator creation]
E[KafkaDataConsumer
📥 Low-level record fetching
🚨 Data loss detection
📊 Metrics tracking
🔄 Consumer pool management]
end
subgraph "Kafka Cluster"
F[Kafka Brokers
📚 Topic partitions
📝 Offset metadata
💾 Message storage
🔄 Replication]
end
A --> B
B --> C
C --> D
D --> E
E --> F
B --> F
style A fill:#e1f5fe
style B fill:#f3e5f5
style C fill:#e8f5e8
style D fill:#fff3e0
style E fill:#fce4ec
style F fill:#f1f8e9The diagram above illustrates the complete architecture flow of Spark's Kafka offset management system:
Driver Components (Blue Section):
Executor Components (Orange Section):
External System (Green Section):
The arrows show the flow: Driver components plan the work, Executors execute it, and both interact with Kafka for different purposes (metadata vs data).
graph TB
subgraph "Configuration Parameters"
A[minPartitions
🎯 Minimum Spark partitions
Default: None
Purpose: Ensure parallelism]
B[maxRecordsPerPartition
📏 Max records per partition
Default: None
Purpose: Memory management]
C[maxOffsetsPerTrigger
🚦 Rate limiting for streaming
Default: None
Purpose: Batch size control]
D[failOnDataLoss
⚠️ Error handling behavior
Default: true
Purpose: Data consistency]
end
subgraph "Impact on Processing"
E[Partition Count
📊 Number of Spark tasks
🔄 Parallelism level
⚡ Resource utilization]
F[Memory Usage
💾 Per-partition memory
🗂️ Buffer requirements
🔄 GC pressure]
G[Throughput
📈 Records per second
⏱️ Latency characteristics
🔄 Backpressure handling]
H[Fault Tolerance
🛡️ Error recovery
📊 Data loss handling
🔄 Retry behavior]
end
A --> E
B --> F
C --> G
D --> H
style A fill:#e3f2fd
style B fill:#e8f5e8
style C fill:#fff3e0
style D fill:#ffebeeThis diagram shows how configuration parameters directly affect processing characteristics:
Configuration Parameters (Left Side):
Processing Impact (Right Side):
Each configuration parameter affects different aspects of performance:
flowchart TD
A[🎯 Input: Map of TopicPartition → KafkaOffsetRange
📊 Example: orders-0: 1000→151000 150k records
📊 orders-1: 2000→82000 80k records
📊 orders-2: 5000→35000 30k records] --> B[🔍 Filter ranges where size > 0
✅ Valid ranges only
❌ Skip empty ranges]
B --> C{📏 maxRecordsPerPartition set?
🎯 Memory management check
⚖️ Prevent oversized partitions}
C -->|✅ YES| D[✂️ Split ranges exceeding maxRecords
📊 orders-0: 150k > 50k → Split needed
📊 orders-1: 80k > 50k → Split needed
📊 orders-2: 30k < 50k → Keep as-is]
C -->|❌ NO| E[📦 Keep original ranges
1:1 Kafka → Spark mapping]
D --> F[🧮 Calculate: parts = ceil(size / maxRecords)
📊 orders-0: ceil(150k/50k) = 3 parts
📊 orders-1: ceil(80k/50k) = 2 parts
📊 orders-2: 1 part unchanged]
F --> G[✂️ Apply getDividedPartition method
🔄 Integer division with remainder handling
📊 Ensure equal distribution]
G --> H[🔄 Update ranges with split results
📊 orders-0: 3 ranges (50k, 50k, 50k)
📊 orders-1: 2 ranges (40k, 40k)
📊 orders-2: 1 range (30k)
📊 Total: 6 partitions]
E --> I{🎯 Current partitions < minPartitions?
⚖️ Parallelism requirement check
📊 Target partition count}
H --> I
I -->|❌ NO| J[✅ Use current partition set
📊 Sufficient parallelism
🎯 Meet requirements]
I -->|✅ YES| K[📊 Calculate total size and distribution
🧮 Total: 260k records across 6 partitions
🎯 Need: 8 partitions (minPartitions)
📊 Missing: 2 partitions]
K --> L[🔍 Identify partitions to split vs keep
📊 Large partitions: orders-0 ranges (50k each)
📊 Small partitions: orders-2 (30k)
⚖️ Split large, keep small]
L --> M[✂️ Apply proportional splitting
📊 Split largest orders-0 ranges
🔄 Create additional partitions
⚖️ Balance load distribution]
M --> N[🔄 Merge split and unsplit partitions
📊 Final count: 8 partitions
✅ Meet minPartitions requirement]
J --> O[📍 Assign preferred executor locations
🏷️ Hash-based distribution
🔄 Enable consumer reuse
⚡ Optimize performance]
N --> O
O --> P[🎯 Return final KafkaOffsetRange array
📊 Complete partition specification
📍 Executor assignments
✅ Ready for execution]
style A fill:#e3f2fd
style D fill:#e8f5e8
style F fill:#fff3e0
style K fill:#ffebee
style P fill:#e8f5e8This flowchart illustrates the step-by-step process of how Spark calculates offset ranges:
Step 1 - Input Processing:
Imagine you have a pizza delivery business with 3 delivery areas (Kafka partitions). Each area has a different number of orders waiting:
Step 2 - Memory Management Check:
You decide each delivery driver can handle at most 50,000 orders (maxRecordsPerPartition = 50k). This prevents any single driver from being overwhelmed.
Step 3 - Splitting Oversized Areas:
Step 4 - Parallelism Check:
You want at least 8 drivers working (minPartitions = 8) for efficiency, but you only have 6. So you need 2 more drivers.
Step 5 - Additional Splitting:
Take the largest remaining chunks (the 50k order areas) and split them further:
Step 6 - Driver Assignment:
Assign each driver to a specific delivery truck (executor) using a consistent method (hashing). This ensures the same driver handles the same area consistently, which improves efficiency through route familiarity (consumer reuse).
Let's trace through a complete example with visual representation:
graph TB
subgraph "🏪 Kafka Cluster State"
subgraph "📋 orders topic"
O1[📦 partition-0
📊 Range: 1000 → 151000
📏 Size: 150,000 records
⏰ Latest: 151000]
O2[📦 partition-1
📊 Range: 2000 → 82000
📏 Size: 80,000 records
⏰ Latest: 82000]
O3[📦 partition-2
📊 Range: 5000 → 35000
📏 Size: 30,000 records
⏰ Latest: 35000]
end
subgraph "💳 payments topic"
P1[📦 partition-0
📊 Range: 100 → 40100
📏 Size: 40,000 records
⏰ Latest: 40100]
end
end
subgraph "⚙️ Configuration"
C1[🎯 minPartitions = 8
📏 maxRecordsPerPartition = 50,000
🚦 Ensure parallelism & memory limits]
end
style O1 fill:#ffcdd2
style O2 fill:#f8bbd9
style O3 fill:#e1bee7
style P1 fill:#c8e6c9Think of this as a warehouse inventory system:
Configuration: We want at least 8 workers (minPartitions) and no worker should handle more than 50,000 items (maxRecordsPerPartition).
graph TB
subgraph "🔍 Step 1: Check Record Limits"
A1[📦 orders-0: 150k records
❌ Exceeds 50k limit
✂️ Needs splitting]
A2[📦 orders-1: 80k records
❌ Exceeds 50k limit
✂️ Needs splitting]
A3[📦 orders-2: 30k records
✅ Within 50k limit
📦 Keep as-is]
A4[📦 payments-0: 40k records
✅ Within 50k limit
📦 Keep as-is]
end
subgraph "✂️ Splitting Logic"
B1[🧮 orders-0 split calculation
📊 ceil(150k/50k) = 3 parts
📏 50k + 50k + 50k]
B2[🧮 orders-1 split calculation
📊 ceil(80k/50k) = 2 parts
📏 40k + 40k]
end
subgraph "📊 Results After Step 1"
C1[📦 orders-0-0: 1000→51000 📏 50k
📦 orders-0-1: 51000→101000 📏 50k
📦 orders-0-2: 101000→151000 📏 50k]
C2[📦 orders-1-0: 2000→42000 📏 40k
📦 orders-1-1: 42000→82000 📏 40k]
C3[📦 orders-2-0: 5000→35000 📏 30k]
C4[📦 payments-0-0: 100→40100 📏 40k]
C5[📊 Total: 7 partitions
🎯 Target: 8 partitions
📊 Missing: 1 partition]
end
A1 --> B1
A2 --> B2
B1 --> C1
B2 --> C2
A3 --> C3
A4 --> C4
style A1 fill:#ffcdd2
style A2 fill:#f8bbd9
style A3 fill:#c8e6c9
style A4 fill:#dcedc8
style C5 fill:#fff3e0This is like organizing a large warehouse shipping operation:
Initial Assessment:
Splitting Strategy:
Result: We now have 7 workers, but our target is 8 for optimal parallelism.
graph TB
subgraph "🎯 Step 2: Ensure Minimum Partitions"
A[📊 Current: 7 partitions
🎯 Required: 8 partitions
📊 Gap: 1 partition needed]
B[📊 Total records: 300k
📊 Average per partition: 37.5k
⚖️ Load balancing analysis]
end
subgraph "🔍 Partition Analysis"
C[📦 orders-0-0: 50k ⭐ Largest
📦 orders-0-1: 50k ⭐ Largest
📦 orders-0-2: 50k ⭐ Largest
📦 orders-1-0: 40k 📊 Medium
📦 orders-1-1: 40k 📊 Medium
📦 orders-2-0: 30k 📊 Small
📦 payments-0-0: 40k 📊 Medium]
end
subgraph "✂️ Additional Splitting"
D[🎯 Select orders-0-0 for splitting
📊 Split 50k into 2 parts
📏 25k + 25k distribution]
E[🧮 New ranges:
📦 orders-0-0a: 1000→26000 📏 25k
📦 orders-0-0b: 26000→51000 📏 25k]
end
subgraph "✅ Final Result"
F[📊 Total: 8 partitions
🎯 Meets minPartitions requirement
📏 Balanced load distribution
✅ Ready for execution]
end
A --> B
B --> C
C --> D
D --> E
E --> F
style A fill:#e3f2fd
style D fill:#e8f5e8
style F fill:#c8e6c9This is like adding one more worker to achieve optimal team size:
Gap Analysis:
We have 7 workers but need 8 for optimal efficiency. We need to split one more partition.
Selection Strategy:
Among all current partitions, we look for the largest ones that can be split without creating too much imbalance:
Splitting Decision:
We choose to split one of the 50k partitions (orders-0-0) because:
Final Team:
Now we have 8 workers with loads ranging from 25k to 50k items - much more balanced than the original 30k to 150k range.
graph TB
subgraph "🎯 Final Spark Partitions Layout"
subgraph "🖥️ Executor 1 (Hash: orders-0)"
E1P1[📦 Partition 0
📊 orders-0: 1000→26000
📏 25,000 records
⏱️ Est. 2.5 min]
E1P2[📦 Partition 1
📊 orders-0: 26000→51000
📏 25,000 records
⏱️ Est. 2.5 min]
E1P3[📦 Partition 2
📊 orders-0: 51000→101000
📏 50,000 records
⏱️ Est. 5 min]
end
subgraph "🖥️ Executor 2 (Hash: orders-1)"
E2P1[📦 Partition 3
📊 orders-0: 101000→151000
📏 50,000 records
⏱️ Est. 5 min]
E2P2[📦 Partition 4
📊 orders-1: 2000→42000
📏 40,000 records
⏱️ Est. 4 min]
end
subgraph "🖥️ Executor 3 (Hash: orders-2) payments-0)"
E3P1[📦 Partition 5
📊 orders-1: 42000→82000
📏 40,000 records
⏱️ Est. 4 min]
E3P2[📦 Partition 6
📊 orders-2: 5000→35000
📏 30,000 records
⏱️ Est. 3 min]
E3P3[📦 Partition 7
📊 payments-0: 100→40100
📏 40,000 records
⏱️ Est. 4 min]
end
end
subgraph "📊 Performance Metrics"
M1[⚖️ Load Balance: Good
📊 Max: 50k, Min: 25k
📊 Ratio: 2:1 (acceptable)]
M2[🎯 Parallelism: Optimal
📊 8 partitions across 3 executors
⚡ Full resource utilization]
M3[💾 Memory Usage: Controlled
📊 Max 50k × 1KB = 50MB per partition
🔄 GC pressure minimal]
end
style E1P1 fill:#ffcdd2
style E1P2 fill:#f8bbd9
style E1P3 fill:#e1bee7
style E2P1 fill:#d1c4e9
style E2P2 fill:#c5cae9
style E3P1 fill:#bbdefb
style E3P2 fill:#b3e5fc
style E3P3 fill:#b2dfdbThis diagram shows the final "work assignment" across the computing cluster:
Executor Assignment (Like Warehouse Locations):
Executor 1: Gets all orders-0 related partitions (partitions 0, 1, 2)
Executor 2: Gets remaining orders-0 and some orders-1 (partitions 3, 4)
Executor 3: Gets remaining orders-1, orders-2, and payments-0 (partitions 5, 6, 7)
Performance Characteristics:
Estimated Processing Time:
graph TB
subgraph "📊 Estimation Phase (Planning)"
A[🧮 KafkaOffsetRange.size
📊 = untilOffset - fromOffset
📊 orders-0: 151000-1000 = 150k
🎯 Used for splitting decisions]
B[⚠️ Assumptions Made
📊 1 offset = 1 record
📊 No transaction metadata
📊 No log compaction
📊 No aborted transactions]
C[📈 Potential Overestimation
📊 Transaction control records
📊 Aborted messages
📊 Compacted duplicates
📊 Actual < Estimated]
end
subgraph "🔍 Actual Counting Phase (Execution)"
D[📥 KafkaDataConsumer.get
🔄 Iterates through actual records
📊 Skips metadata records
📊 Handles isolation levels]
E[📊 Record Type Filtering
✅ Data records → Count
❌ Control records → Skip
❌ Aborted records → Skip
📊 Track totalRecordsRead]
F[📈 Actual Count Tracking
📊 totalRecordsRead: Real count
📊 numRecordsPolled: Raw count
📊 numPolls: API calls
📊 Accurate measurement]
end
A --> B
B --> C
D --> E
E --> F
C --> G[📊 Example Gap
📊 Estimated: 150,000 records
📊 Actual: 147,500 records
📊 Difference: 2,500 (1.7%)]
F --> H[✅ Accurate Results
📊 Processable records only
📊 Consistent with semantics
📊 Ready for downstream]
style A fill:#e3f2fd
style D fill:#e8f5e8
style G fill:#fff3e0
style H fill:#c8e6c9This illustrates the difference between "estimated" and "actual" record counts, like the difference between a restaurant's seating capacity and actual customers served:
Estimation Phase (Planning - Left Side):
Think of this like a restaurant manager planning for the evening:
Reality Check (Execution - Right Side):
When the restaurant actually opens:
Why the Difference?
Practical Impact:
This two-phase approach allows Spark to make good planning decisions quickly while still providing accurate final counts.
graph TB
subgraph "📦 Raw Kafka Records Stream"
A[📄 Data Record 1
📊 offset: 1000
💾 payload: order_data
✅ Include in count]
B[🔄 Control Record
📊 offset: 1001
💾 payload: begin_txn
❌ Skip, don't count]
C[📄 Data Record 2
📊 offset: 1002
💾 payload: order_data
✅ Include in count]
D[📄 Data Record 3
📊 offset: 1003
💾 payload: order_data
❌ Aborted, don't count]
E[🔄 Control Record
📊 offset: 1004
💾 payload: abort_txn
❌ Skip, don't count]
F[📄 Data Record 4
📊 offset: 1005
💾 payload: order_data
✅ Include in count]
end
subgraph "🔍 Consumer Processing Logic"
G[📥 KafkaDataConsumer.get
🔄 Process each record
📊 Check record type
📊 Apply isolation level]
H{📊 Record Type Check
🔍 Data vs Control
📊 Transaction state}
I[📊 Isolation Level Check
🔍 read_committed level
📊 Transaction status
✅ Committed only]
end
subgraph "📊 Counting Results"
J[📊 totalRecordsRead: 3
📊 numRecordsPolled: 6
📊 numPolls: 2
📊 Efficiency: 50%]
K[📈 Metrics Tracking
📊 Processing rate
📊 Filtering overhead
📊 Consumer efficiency]
end
A --> G
B --> G
C --> G
D --> G
E --> G
F --> G
G --> H
H --> I
I --> J
J --> K
style A fill:#c8e6c9
style B fill:#ffcdd2
style C fill:#c8e6c9
style D fill:#ffcdd2
style E fill:#ffcdd2
style F fill:#c8e6c9
style J fill:#e3f2fdThis diagram shows how Kafka's transaction system affects record counting, like filtering valid vs invalid items on a production line:
Raw Kafka Stream (Top Section):
Imagine a manufacturing conveyor belt with different types of items:
Processing Logic (Middle Section):
Like a quality control inspector:
Specific Example Walk-through:
Final Results:
Why This Matters:
sequenceDiagram
participant RDD as 🎯 KafkaSourceRDD
participant Consumer as 📥 KafkaDataConsumer
participant Kafka as 🏪 Kafka Cluster
participant Config as ⚙️ Configuration
Note over RDD,Kafka: 🔄 Normal Processing Flow
RDD->>Consumer: 📊 get(offset=1000)
Consumer->>Kafka: 📡 fetch(offset=1000)
Kafka-->>Consumer: ✅ Records from offset 1000
Consumer-->>RDD: 📊 Return records
Note over RDD,Kafka: ⚠️ Data Loss Scenario
RDD->>Consumer: 📊 get(offset=1000)
Consumer->>Kafka: 📡 fetch(offset=1000)
Kafka-->>Consumer: ❌ Error: Offset 1000 not available
📊 Earliest: 1200
📊 Data aged out (200 records lost)
Consumer->>Config: 🔍 Check failOnDataLoss setting
alt 🚨 failOnDataLoss=true
Config-->>Consumer: ✅ Strict mode enabled
Consumer->>RDD: 💥 Throw OffsetOutOfRangeException
📊 Lost records: 200
📊 Range: 1000-1199
RDD->>RDD: 🛑 Query fails immediately
📊 Ensure data consistency
📊 Manual intervention required
else 🔄 failOnDataLoss=false
Config-->>Consumer: ⚠️ Tolerant mode enabled
Consumer->>Consumer: 📝 Log WARNING about data loss
📊 Lost records: 200
📊 Adjusting start offset to 1200
Consumer->>Kafka: 📡 fetch(offset=1200)
Kafka-->>Consumer: ✅ Records from offset 1200
Consumer-->>RDD: 📊 Return records (fewer than expected)
📊 Actual records: 1800
📊 Expected records: 2000
end
Note over RDD,Kafka: 📊 Metrics Update
Consumer->>Consumer: 📊 Update metrics
📊 dataLossDetected: true
📊 recordsLost: 200
📊 adjustedStartOffset: 1200This sequence diagram illustrates how Spark handles data loss scenarios, like dealing with missing pages in a book:
Normal Flow (Happy Path):
Data Loss Scenario (Problem):
Two Response Strategies:
Strict Mode (failOnDataLoss=true):
Like a strict academic policy:
Tolerant Mode (failOnDataLoss=false):
Like a flexible academic policy:
Practical Business Impact:
Metrics and Monitoring:
The system tracks:
This allows operators to understand the impact and make informed decisions about data quality.
graph TB
subgraph "🎯 Phase 1: Query Planning (Driver)"
A[📊 KafkaSource.initialOffset
⏱️ Time: 50ms
📊 Memory: 10MB
🔄 API calls: 5]
B[📡 KafkaOffsetReader.fetchLatestOffsets
⏱️ Time: 200ms
📊 Network: 15 requests
🔄 Partitions discovered: 11]
C[🧮 KafkaOffsetRangeCalculator.getRanges
⏱️ Time: 30ms
📊 CPU: Light
🔄 Ranges calculated: 8]
end
subgraph "🏗️ Phase 2: RDD Creation (Driver)"
D[📦 KafkaSourceRDD.createPartitions
⏱️ Time: 20ms
📊 Memory: 5MB
🔄 Partitions: 8]
E[📍 Preferred location assignment
⏱️ Time: 10ms
📊 Hash calculations: 8
🔄 Executors: 3]
end
subgraph "⚡ Phase 3: Task Execution (Executors)"
F[🖥️ Executor 1: 3 tasks
⏱️ Time: 5 min
📊 Memory: 150MB
🔄 Records: 125k]
G[🖥️ Executor 2: 2 tasks
⏱️ Time: 4.5 min
📊 Memory: 100MB
🔄 Records: 90k]
H[🖥️ Executor 3: 3 tasks
⏱️ Time: 4 min
📊 Memory: 110MB
🔄 Records: 110k]
end
subgraph "📊 Phase 4: Consumer Operations (Executors)"
I[📥 KafkaDataConsumer operations
⏱️ Avg fetch time: 15ms
📊 Throughput: 10k rec/sec
🔄 Consumer reuse: 85%]
J[📈 Record processing
⏱️ Processing rate: 8k rec/sec
📊 Filtering overhead: 5%
🔄 Memory efficiency: 90%]
end
subgraph "🎯 Phase 5: Results Aggregation (Driver)"
K[📊 Batch completion
⏱️ Total time: 5.5 min
📊 Total records: 325k
🔄 Success rate: 99.5%]
L[📈 Performance metrics
📊 Throughput: 985 rec/sec
📊 Latency: P99 < 100ms
🔄 Resource utilization: 78%]
end
A --> B
B --> C
C --> D
D --> E
E --> F
E --> G
E --> H
F --> I
G --> I
H --> I
I --> J
J --> K
K --> L
style A fill:#e3f2fd
style D fill:#e8f5e8
style F fill:#fff3e0
style I fill:#fce4ec
style K fill:#c8e6c9This diagram shows the end-to-end processing pipeline, like a restaurant operation from menu planning to customer service:
Phase 1: Query Planning (Driver - Like Restaurant Management):
KafkaSource.initialOffset: Head manager decides what to serve today
KafkaOffsetReader.fetchLatestOffsets: Check what's available in the kitchen
KafkaOffsetRangeCalculator.getRanges: Plan the work distribution
Phase 2: RDD Creation (Driver - Like Kitchen Setup):
KafkaSourceRDD.createPartitions: Set up cooking stations
Preferred location assignment: Assign chefs to stations
Phase 3: Task Execution (Executors - Like Kitchen Teams):
Executor 1: Team of 3 cooking stations
Executor 2: Team of 2 cooking stations
Executor 3: Team of 3 cooking stations
Phase 4: Consumer Operations (Executors - Like Food Preparation):
KafkaDataConsumer operations: Actual cooking process
Record processing: Final dish preparation
Phase 5: Results Aggregation (Driver - Like Restaurant Summary):
Batch completion: End of service summary
Performance metrics: Management dashboard
Key Insights:
graph TB
subgraph "🏗️ Consumer Pool Architecture"
subgraph "🖥️ Executor 1"
A[📥 Consumer Pool
🔄 LRU Cache: 16 consumers
📊 Hit rate: 85%
⏱️ Avg lifetime: 30 min]
B[📦 orders-0 → Consumer1
🔄 Reused across batches
📊 Connection: Persistent
⏱️ Last used: 2 min ago]
C[📦 orders-1 → Consumer2
🔄 Sticky assignment
📊 TCP connection: Active
⏱️ Active tasks: 3]
end
subgraph "🖥️ Executor 2"
D[📥 Consumer Pool
🔄 LRU Cache: 16 consumers
📊 Hit rate: 90%
⏱️ Eviction rate: 2/hour]
E[📦 payments-0 → Consumer3
🔄 High reuse frequency
📊 Throughput: 12k rec/sec
⏱️ Uptime: 45 min]
F[📦 orders-2 → Consumer4
🔄 Moderate usage
📊 Fetch size: 1MB
⏱️ Idle time: 5 min]
end
subgraph "🖥️ Executor 3"
G[📥 Consumer Pool
🔄 LRU Cache: 16 consumers
📊 Hit rate: 82%
⏱️ Memory usage: 50MB]
H[📦 inventory-0 → Consumer5
🔄 Low frequency partition
📊 Batch size: 1k records
⏱️ Last active: 10 min]
end
end
subgraph "📊 Pool Management Metrics"
I[🎯 Assignment Strategy
📊 Hash-based distribution
🔄 Consistent assignment
⚡ Load balancing]
J[📈 Performance Impact
📊 Connection overhead: -60%
📊 Throughput increase: +40%
🔄 Latency reduction: -25%]
K[💾 Memory Management
📊 Pool size: 16 per executor
📊 Memory per consumer: 3MB
🔄 Total overhead: 144MB]
end
A --> B
A --> C
D --> E
D --> F
G --> H
style A fill:#e3f2fd
style D fill:#e8f5e8
style G fill:#fff3e0
style I fill:#fce4ec
style J fill:#c8e6c9
style K fill:#ffebeeThis diagram illustrates how Spark manages Kafka consumer connections, like a restaurant managing specialized cooking stations:
Consumer Pool Architecture (Like Restaurant Stations):
Executor 1 (Like Main Kitchen):
Consumer Pool: A storage area for 16 specialized cooking tools
Specific Assignments:
orders-0 → Consumer1: Dedicated pasta station
orders-1 → Consumer2: Dedicated pizza station
Executor 2 (Like Dessert Kitchen):
Consumer Pool: Specialized for dessert making
Specific Assignments:
payments-0 → Consumer3: High-volume ice cream station
orders-2 → Consumer4: Moderate-volume cake station
Executor 3 (Like Specialty Kitchen):
Consumer Pool: Handles specialty items
Specific Assignments:
Pool Management Benefits:
Assignment Strategy:
Performance Impact:
Memory Management:
Real-World Analogy:
Imagine a restaurant where:
This consumer pool strategy is crucial for high-performance Kafka processing because establishing new connections is expensive, but reusing existing connections is very fast.
graph TB
subgraph "📊 Partition Size Analysis"
A[📏 Partition Size Factors
📊 Record count
💾 Record size
⏱️ Processing time
📊 Memory usage]
B[❌ Too Small Partitions
📊 < 10k records
⏱️ High task overhead
📊 Poor resource utilization
🔄 Excessive coordinator load]
C[❌ Too Large Partitions
📊 > 500k records
💾 Memory pressure
⏱️ Long task duration
🔄 Straggler tasks]
D[✅ Optimal Partitions
📊 50k-200k records
💾 50-200MB memory
⏱️ 1-5 min duration
🔄 Balanced load]
end
subgraph "🎯 Sizing Recommendations"
E[📊 Formula: Optimal Size
🧮 Records = Available Memory / (Record Size × 2)
📊 Example: 1GB / (1KB × 2) = 500k records
⚡ Safety factor: 2x for buffers]
F[⚙️ Configuration Tuning
📊 maxRecordsPerPartition: 100k
📊 minPartitions: CPU cores × 2
🔄 Dynamic adjustment based on load]
G[📈 Performance Impact
📊 Throughput: Linear with partitions
📊 Latency: Inverse with size
🔄 Sweet spot: 100k records]
end
subgraph "🔧 Troubleshooting Guide"
H[🚨 Memory Issues
📊 Reduce maxRecordsPerPartition
💾 Increase executor memory
🔄 Enable off-heap storage]
I[⏱️ Performance Issues
📊 Increase parallelism
🔄 Check consumer reuse
📊 Monitor partition skew]
J[🔄 Load Balancing
📊 Increase minPartitions
⚖️ Monitor task duration
📊 Check preferred locations]
end
A --> B
A --> C
A --> D
D --> E
E --> F
F --> G
B --> H
C --> H
D --> I
G --> J
style A fill:#e3f2fd
style D fill:#c8e6c9
style E fill:#e8f5e8
style H fill:#ffcdd2
style I fill:#fff3e0
style J fill:#dcedc8This diagram explains how to choose the right partition size, like determining the optimal workload for employees:
Partition Size Factors (Top Center):
Think of this like managing a call center:
Three Scenarios:
Too Small Partitions (Red - Left):
Like having agents handle only 1-2 calls per hour:
Too Large Partitions (Red - Right):
Like having agents handle 100+ calls per hour:
Optimal Partitions (Green - Center):
Like having agents handle 20-50 calls per hour:
Sizing Recommendations:
Formula for Optimal Size:
Optimal records per partition = Available Memory / (Record Size × Safety Factor)
Example: 1GB / (1KB × 2) = 500k records maximum
Think of this like: "How many phone calls can an agent handle given their workspace (memory) and the complexity of calls (record size)?"
Configuration Tuning:
Performance Impact:
Troubleshooting Guide:
Memory Issues (Red):
When agents run out of workspace:
Performance Issues (Yellow):
When work is too slow:
Load Balancing (Green):
When some agents are overworked:
This approach ensures optimal resource utilization while maintaining predictable performance.
graph TB
subgraph "📊 Real-time Metrics"
A[📈 Throughput Metrics
📊 Records/sec: 50k
📊 Batches/min: 12
📊 Lag: 2.5k records
⏱️ Latency: P99 < 200ms]
B[💾 Resource Utilization
📊 CPU: 75%
📊 Memory: 2.8GB/4GB
📊 Network: 100MB/sec
🔄 Disk I/O: 50MB/sec]
C[🔄 Consumer Metrics
📊 Pool hit rate: 85%
📊 Connection reuse: 90%
📊 Fetch latency: 15ms
📊 Poll frequency: 100/sec]
end
subgraph "⚠️ Alert Conditions"
D[🚨 Performance Alerts
📊 Throughput < 30k rec/sec
📊 Latency > 1000ms
📊 Error rate > 1%
🔄 Consumer lag > 10k]
E[💾 Resource Alerts
📊 Memory usage > 90%
📊 GC time > 200ms
📊 CPU usage > 90%
🔄 Disk space < 10GB]
F[🔄 Kafka Alerts
📊 Broker down
📊 Partition offline
📊 Replication lag > 5 min
🔄 Topic deletion]
end
subgraph "🔧 Troubleshooting Actions"
G[📊 Scale Out
🔄 Increase executors
📊 Add more partitions
⚡ Boost parallelism
📊 Load balancing]
H[⚙️ Configuration Tuning
📊 Adjust partition size
🔄 Optimize batch size
📊 Tune consumer props
⚡ Memory allocation]
I[🏪 Kafka Optimization
📊 Increase retention
🔄 Partition rebalancing
📊 Broker scaling
⚡ Network tuning]
end
A --> D
B --> E
C --> F
D --> G
E --> H
F --> I
style A fill:#e3f2fd
style B fill:#e8f5e8
style C fill:#c8e6c9
style D fill:#ffcdd2
style E fill:#fff3e0
style F fill:#fce4ec
style G fill:#dcedc8
style H fill:#f3e5f5
style I fill:#e0f2f1This diagram shows a comprehensive monitoring system, like a hospital's patient monitoring dashboard:
Real-time Metrics (Top Section - Like Vital Signs):
Throughput Metrics (Blue):
Like monitoring a patient's heart rate and blood pressure:
Resource Utilization (Green):
Like monitoring organ function:
Consumer Metrics (Light Green):
Like monitoring specific treatment effectiveness:
Alert Conditions (Middle Section - Like Medical Alerts):
Performance Alerts (Red):
Like critical vital signs:
Resource Alerts (Yellow):
Like warning signs:
Kafka Alerts (Pink):
Like external system failures:
Troubleshooting Actions (Bottom Section - Like Medical Treatments):
Scale Out (Green):
Like adding more medical staff:
Configuration Tuning (Purple):
Like adjusting medication dosages:
Kafka Optimization (Light Blue):
Like improving hospital infrastructure:
Monitoring Philosophy:
This comprehensive monitoring approach ensures that both the symptoms (performance metrics) and the causes (resource constraints, external dependencies) are tracked and addressed systematically.
graph TB
subgraph "🎯 Production Configuration"
A[⚙️ Core Settings
📊 minPartitions: CPU cores × 2
📊 maxRecordsPerPartition: 100k
📊 maxOffsetsPerTrigger: 1M
🔄 failOnDataLoss: true]
B[🏪 Kafka Consumer Props
📊 fetch.max.bytes: 50MB
📊 max.partition.fetch.bytes: 10MB
📊 session.timeout.ms: 30000
🔄 enable.auto.commit: false]
C[💾 Memory Settings
📊 spark.executor.memory: 4g
📊 spark.executor.memoryFraction: 0.7
📊 spark.sql.adaptive.enabled: true
🔄 spark.sql.adaptive.coalescePartitions: true]
end
subgraph "📊 Performance Tuning"
D[🔄 Parallelism Tuning
📊 Target: 2-4 tasks per CPU core
📊 Partition size: 50-200k records
📊 Task duration: 1-5 minutes
⚡ Avoid micro-batching]
E[📈 Throughput Optimization
📊 Batch size: Balance latency vs throughput
📊 Consumer prefetch: 2-5 batches
📊 Compression: Enable LZ4
🔄 Serialization: Use Kryo]
F[🛡️ Reliability Settings
📊 Checkpointing: Every 10 batches
📊 WAL: Enabled for fault tolerance
📊 Retries: 3 with exponential backoff
🔄 Idempotent producers]
end
subgraph "🔧 Monitoring Setup"
G[📊 Key Metrics
📊 Input rate vs processing rate
📊 Batch processing time
📊 Consumer lag by partition
🔄 Memory usage patterns]
H[⚠️ Alert Thresholds
📊 Processing delay > 2 minutes
📊 Consumer lag > 100k records
📊 Error rate > 0.1%
🔄 Memory usage > 85%]
I[🎯 Capacity Planning
📊 Peak load: 3x average
📊 Retention: 7 days minimum
📊 Scaling headroom: 50%
🔄 Disaster recovery: 2x regions]
end
A --> D
B --> E
C --> F
D --> G
E --> H
F --> I
style A fill:#e3f2fd
style D fill:#e8f5e8
style G fill:#c8e6c9
style B fill:#fff3e0
style E fill:#fce4ec
style H fill:#ffcdd2
style C fill:#dcedc8
style F fill:#f3e5f5
style I fill:#e0f2f1This diagram outlines proven configurations and practices, like a comprehensive operations manual:
Production Configuration (Top Section - Like Basic Operating Procedures):
Core Settings (Blue):
Like fundamental business rules:
Kafka Consumer Props (Yellow):
Like supplier relationship settings:
Memory Settings (Green):
Like resource allocation policies:
Performance Tuning (Middle Section - Like Optimization Guidelines):
Parallelism Tuning (Green):
Like workload distribution strategy:
Throughput Optimization (Pink):
Like efficiency improvements:
Reliability Settings (Purple):
Like business continuity measures:
Monitoring Setup (Bottom Section - Like Quality Assurance):
Key Metrics (Light Green):
Like business KPIs:
Alert Thresholds (Red):
Like warning systems:
Capacity Planning (Light Blue):
Like strategic planning:
Implementation Philosophy:
These best practices represent years of experience running Kafka-based Spark applications in production environments, providing a solid foundation for reliable, high-performance data processing.